home *** CD-ROM | disk | FTP | other *** search
Wrap
# Source Generated with Decompyle++ # File: in.pyc (Python 2.4) import os import re import sys import md5 import time import copy import email import base64 import socket import string import thread import imaplib import traceback import threading import email.Header as email try: frozenset except NameError: from sets import ImmutableSet as frozenset from spambayes import Dibbler from spambayes import storage from spambayes.message import insert_exception_header, PERSISTENT_HAM_STRING from spamexperts import Options verbose = Options.options[('globals', 'verbose')] Options.options[('globals', 'verbose')] = False from spambayes.scripts.sb_server import ServerLineReader Options.options[('globals', 'verbose')] = verbose del verbose from spamexperts import ProxyClassifier from spamexperts.message import SEHeaderMessage from se_config import spamexpertsConfig as configuration from spamexperts.OptionsClass import IS_HAM, IS_SPAM, BLOCKED, IS_UNSURE class IMAP4ProxyBase(Dibbler.BrighterAsyncChat): """An async dispatcher that understands IMAP4 and proxies to an IMAP4 server, calling `self.onTransaction(request, response)` for each transaction. Generally similar to the POP3ProxyBase class. self.onTransaction() should return the response to pass back to the email client - the response can be the verbatim response or a processed version of it. The special command 'KILL' kills it (passing a 'LOGOUT' command to the server). """ def __init__(self, clientSocket, serverName, serverPort, ssl = False): Dibbler.BrighterAsyncChat.__init__(self, clientSocket) self.request = '' self.response = '' self.set_terminator('\r\n') self.command = '' self.command_id = None self.args = [] self.isClosing = False if not self.onIncomingConnection(clientSocket): self.push('-ERR Connection not allowed\r\n') self.close_when_done() return None self.serverName = serverName self.serverPort = serverPort self.serverSocket = ServerLineReader(serverName, serverPort, self.onServerLine, ssl) def close(self): if hasattr(self, 'serverSocket'): self.serverSocket.close() Dibbler.BrighterAsyncChat.close(self) def onIncomingConnection(self, clientSocket): '''Checks the security settings.''' remoteIP = clientSocket.getpeername()[0] trustedIPs = Options.options[('imap4proxy', 'allow_remote_connections')] if trustedIPs == '*' or remoteIP == clientSocket.getsockname()[0]: return True trustedIPs = trustedIPs.replace('.', '\\.').replace('*', '([01]?\\d\\d?|2[04]\\d|25[0-5])') for trusted in trustedIPs.split(','): if re.search('^' + trusted + '$', remoteIP): return True continue return False def onTransaction(self, command_id, command, args, response): '''Overide this. Takes the raw request and the response, and returns the (possibly processed) response to pass back to the email client. ''' raise NotImplementedError def onServerLine(self, line): '''A line of response has been received from the IMAP4 server.''' self.response = self.response + line if not line: self.isClosing = True self.onResponse() self.response = '' return None if not self.command: self.push(self.response) self.response = '' elif line.startswith('+ '): self.push(self.response) self.response = '' elif line.startswith(self.command_id): self.onResponse() self.response = '' def collect_incoming_data(self, data): '''Asynchat override.''' self.request = self.request + data def found_terminator(self): '''Asynchat override.''' self.serverSocket.push(self.request + '\r\n') if self.request.strip() == '': self.command = '' self.args = [] else: splitCommand = self.request.strip().split() if len(splitCommand) < 3: self.args += tuple(splitCommand) else: self.command_id = splitCommand[0] self.command = splitCommand[1].upper() self.args = splitCommand[2:] if self.command == 'UID': self.uid = True self.command = self.args[0].upper() self.args = self.args[1:] else: self.uid = False self.request = '' def onResponse(self): if self.response: cooked = self.onTransaction(self.command_id, self.command, self.args, self.response) self.push(cooked) if self.isClosing: self.close_when_done() self.command_id = None self.command = '' self.args = [] self.isClosing = False class SEIMAP4Proxy(IMAP4ProxyBase, ProxyClassifier.ProxyClassifier): '''Proxies between an email client and an IMAP4 server, inserting judgement headers. It acts on the following IMAP4 commands: o FETCH: o If the component(s) being fetched is(are) message(s) then adds the judgement header based on the raw headers and body of the message. o LOGIN: o Does no processing based on the LOGIN command itself, but expires any old messages in the two caches. o AUTHENTICATE: o Same as LOGIN. ''' def __init__(self, clientSocket, serverName, serverPort, state, ssl = False): self.skip_logging = None self.authenticating = None self.isClosed = False ProxyClassifier.ProxyClassifier.__init__(self) IMAP4ProxyBase.__init__(self, clientSocket, serverName, serverPort, ssl) self.handlers = { 'FETCH': self.onFetch, 'LOGIN': self.onLogin, 'AUTHENTICATE': self.onLogin } self.state = state self.state.totalSessions += 1 self.state.activeSessions += 1 self.add_when_cooking = True self.state.proxies.append(self) def send(self, data): '''Logs the data to the log file.''' if self.skip_logging: self.state.imapLogFile.write('...FETCH CONTENTS...\r\n') else: self.state.imapLogFile.write(data) if self.skip_logging: split_data = data.split('\n') if len(split_data) == 1 and split_data[0].startswith(self.skip_logging): self.skip_logging = False elif len(split_data) > 1 and split_data[-2].startswith(self.skip_logging): self.skip_logging = False self.state.logFile.flush() try: return IMAP4ProxyBase.send(self, data) except socket.error: self.close() return 0 def recv(self, size): '''Logs the data to the log file.''' try: data = IMAP4ProxyBase.recv(self, size) except socket.error: e = None if e.args[0] == 10053: pass else: print >>sys.stderr, 'Unexpected socket error:', str(e) if Options.options[('globals', 'verbose')]: traceback.print_exc(None, sys.stderr) self.close() return '' check_data = data.lower().split() if hasattr(self, 'authenticating') and self.authenticating and check_data: if check_data[0] == self.authenticating: self.authenticating = None log_data = data else: log_data = 'XXX AUTHENTICATION DETAILS XXX\r\n' elif len(check_data) > 1 and check_data[1].startswith('login'): log_data = check_data[0] + ' LOGIN XXXXXXXX XXXXXXXX\r\n' elif len(check_data) > 1 and check_data[1].startswith('authenticate'): self.authenticating = check_data[0] log_data = data elif (len(check_data) > 1 or check_data[1].startswith('fetch') or len(check_data) > 2) and check_data[1].startswith('uid') and check_data[2].startswith('fetch'): self.skip_logging = check_data[0] log_data = data else: log_data = data self.state.imapLogFile.write(log_data) self.state.imapLogFile.flush() return data def close(self): if not self.isClosed: self.isClosed = True self.state.activeSessions -= 1 IMAP4ProxyBase.close(self) self.serverSocket.close() self.state.proxies.remove(self) def onTransaction(self, command_id, command, args, response): '''Takes the raw request and response, and returns the (possibly processed) response to pass back to the email client. ''' handler = self.handlers.get(command, self.onUnknown) return handler(command_id, command, args, response) find_header = re.compile('(\\d+)\\s+fetch.*(?:BODY(?:\\.PEEK)?\\[HEADER\\])\\s+{(\\d+)\\}\\r?\\n(.*)', re.DOTALL | re.IGNORECASE) find_body = re.compile('fetch.*(?:BODY(?:\\.PEEK)?\\[\\]|RFC822)\\s+\\{(\\d+)\\}\\r?\\n(.*)', re.DOTALL | re.IGNORECASE) find_size = re.compile('fetch.*(rfc822.size\\s+)(\\d*)', re.DOTALL | re.IGNORECASE) replace_size = re.compile('rfc822.size\\s+(\\d*)', re.DOTALL | re.IGNORECASE) def onFetch(self, unused, unused2, unused3, response): '''If any messages are being fetched, then adds the judgement header based on the raw headers and body of the message(s). If something else (e.g. flags) are being fetched, then just proxies straight through.''' self.state.model_notifier.SetBeginUpdating() mo = self.find_body.search(response) if mo: literal_size = int(mo.group(1)) literal = mo.group(2)[:literal_size] new_version = self.body_cooker(literal) response = response.replace(literal, new_version) response = response.replace('{%d}' % (literal_size,), '{%d}' % (len(new_version),)) mo = self.find_size.search(response) if mo: rfc822_wording = mo.group(1) rfc822_size = mo.group(2) new_version = '%s%s' % (rfc822_wording, self.size_cooker(rfc822_size)) response = self.replace_size.sub(new_version, response) if configuration.eudora_compatibility: mo = self.find_header.search(response) if mo: msg_id = mo.group(1) literal_size = int(mo.group(2)) literal = mo.group(3)[:literal_size] new_version = self.header_cooker(msg_id, literal) response = response.replace(literal, new_version) response = response.replace('{%d}' % (literal_size,), '{%d}' % (len(new_version),)) self.state.model_notifier.SetEndUpdating() return response def size_cooker(self, old_size): return str(int(old_size) + self.HEADER_SIZE_FUDGE_FACTOR) def generate_id(self, messageText): return str(int(md5.md5(messageText).hexdigest(), 16)) split_header_re = re.compile('\\n\\r?\\n') def header_cooker(self, msg_id, headerText): try: self.serverSocket.send('sehc FETCH %s (RFC822)\n' % (msg_id,)) except socket.error: e = None print >>sys.stderr, 'Unexpected socket error:', str(e) if Options.options[('globals', 'verbose')]: traceback.print_exc(None, sys.stderr) return headerText response = [] while True: try: data = self.serverSocket.recv(1024) except socket.error: e = None if e[0] == 10035: continue print >>sys.stderr, 'Unexpected socket error:', str(e) if Options.options[('globals', 'verbose')]: traceback.print_exc(None, sys.stderr) return headerText response.append(data) if '\nsehc' in data: break continue response = ''.join(response) temp = response[response.find('RFC822 {') + 8:] temp2 = temp.find('}') length = int(temp[:temp2]) response = temp[temp2 + 3:temp2 + 2 + length] cooked = self.body_cooker(response) return self.split_header_re.split(cooked, 1)[0] + '\n\r\n' def body_cooker(self, messageText): try: msg = email.message_from_string(messageText, _class = SEHeaderMessage) msg.setId(self.generate_id(messageText)) (score, classification, clues) = self.classify_message(msg) if classification == IS_HAM: classification = Options.options[('Headers', 'header_ham_string')] elif classification == IS_SPAM: classification = Options.options[('Headers', 'header_spam_string')] elif classification == IS_UNSURE: classification = Options.options[('Headers', 'header_unsure_string')] isTooBig = False isSuppressedBulkHam = False if not configuration.block_spam: if classification == Options.options[('Headers', 'header_ham_string')] and Options.options[('Storage', 'no_cache_bulk_ham')]: pass isSuppressedBulkHam = msg.get('precedence') in [ 'bulk', 'list'] size_limit = Options.options[('Storage', 'no_cache_large_messages')] if size_limit > 0: pass isTooBig = len(messageText) > size_limit msg.RememberClassification(classification) msg.addHeaders(prob = score, clues = clues) tid = msg.getId() if self.add_when_cooking and Options.options[('Storage', 'cache_messages')] and not isSuppressedBulkHam and not isTooBig: if classification == Options.options[('Headers', 'header_spam_string')]: self.state.numSpams += 1 print >>sys.stderr, 'Store msg %s in Spamcorpus %s' % (tid, classification) corpus = self.state.spamCorpus elif classification == Options.options[('Headers', 'header_unsure_string')]: print >>sys.stderr, 'Store msg %s in Unsurecorpus %s' % (tid, classification) corpus = self.state.unsureCorpus else: self.state.numHams += 1 print >>sys.stderr, 'Store msg %s in Hamcorpus %s' % (tid, classification) corpus = self.state.hamCorpus message = corpus.makeMessage(msg.getId(), msg.as_string()) corpus.addMessage(message) messageText = msg.as_string() except: (messageText, details) = insert_exception_header(messageText) print >>sys.stderr, details return messageText def onLogin(self, unused, unused2, args, response, do_welcome = True): '''Spins off two separate threads that expires any old messages in the caches, but does not do any processing of the LOGIN command itself.''' thread.start_new_thread(self.state.spamCorpus.removeExpiredMessages, ()) thread.start_new_thread(self.state.hamCorpus.removeExpiredMessages, ()) if response.split()[1].lower() == 'ok': self.current_account = '%s_%s_IMAP' % (args[0], self.serverName) if self.current_account not in self.state.delayed_messages and do_welcome: self.state.delayed_messages[self.current_account] = { } self.state.delayed_messages.store() welcomeText = self.get_welcome_message(args[0], self.serverName) self.serverSocket.setblocking(True) self.serverSocket.send('welcome APPEND INBOX {%d}\r\n' % (len(welcomeText),)) self.serverSocket.recv(1024) self.serverSocket.send(welcomeText + '\r\n') self.serverSocket.recv(1024) self.serverSocket.setblocking(False) return response def onUnknown(self, unused, unused2, unused3, response): """Default handler; returns the server's response verbatim.""" return response class IMAPFilter(object): initial_key = 'initial_access' def close(self): try: self.state.proxies.remove(self) except ValueError: pass def load_folders_to_filter(self): folders = _[1] self.filter_folders = tuple(folders) terminated = False def updateMessageDatabase(self, imap_connection): """Run through the IMAP server (the known folders) and update our local information about whether messages are spam or not. This will take a considerable amount of time, so we cache this information, and don't reconsider messages we have already looked at. Download a copy (and delete from the server) any spam messages, and send these to the 'blocked' database. Perhaps we should reclassify all messages whenever the database changes - after all, it's possible that the classification will change. This might mean releasing messages automatically (since we took them off the server) and would be complicated and time-consuming. For now, we do not do this. """ i = imap_connection for folder in self.filter_folders: try: self.updateMessageDatabaseFolder(i, folder) continue except Exception: if self.terminated: break raise continue i.logout() try: self.state.open_remote_connections.remove(self.current_account) except ValueError: None<EXCEPTION MATCH>Exception None<EXCEPTION MATCH>Exception print >>sys.stderr, 'Connection was missing:', self.current_account except: None<EXCEPTION MATCH>Exception class _dummy_msg(object): '''Dummy message class used to check if a message is in the message info database.''' def __init__(self, key): self.getDBKey = lambda : key self.stored_attributes = [ 'c', 't', 'block_state', 'account', 'date_modified', 'internaldate', 'flags', 'folder_name', 'uid'] for att in self.stored_attributes: setattr(self, att, None) uid_re = re.compile('(?P<id>\\d+) \\(UID (?P<data>\\d+)\\)', re.IGNORECASE) flags_re = re.compile('(?P<id>\\d+) \\(FLAGS \\((?P<data>.*?)\\)\\)', re.IGNORECASE) internaldate_re = re.compile('(?P<id>\\d+) \\(INTERNALDATE \\"(?P<data>.+)\\"\\)', re.IGNORECASE) def _parseFetchResponse(self, msg_id, regex, response): mo = regex.search(response) if mo: if msg_id != mo.group('id'): print >>sys.stderr, 'Error in response (%s != %s)' % (msg_id, mo.group('id')) return None return mo.group('data') print >>sys.stderr, 'Error in response: %s, %s' % (msg_id, response) def _fetchAttribute(self, imap_connection, msg_id, attribute, parser, log = True, uid = False): try: if uid: cmd = imap_connection.uid args = ('FETCH', msg_id, '(%s)' % (attribute,)) else: cmd = imap_connection.fetch args = (msg_id, '(%s)' % (attribute,)) (unused, att) = cmd(*args) except (imaplib.IMAP4.error, socket.error): e = None print >>sys.stderr, 'Cannot get', attribute, msg_id print >>sys.stderr, str(e) return None if log: self.state.imapLogFile.write('FETCH %s %s: %s\r\n' % (msg_id, attribute, att)) else: self.state.imapLogFile.write('...DATA...\r\n') self.state.imapLogFile.flush() if parser is not None: return self._parseFetchResponse(msg_id, parser, att[0]) return att def _normalise_name(self, name): return [](_[1]) def _get_messageinfo_database_key(self, folder_name, uid): norm_current_account = self._normalise_name(self.current_account) norm_folder_name = self._normalise_name(folder_name) norm_uid = self._normalise_name(uid) return '%s_%s_%s' % (norm_current_account, norm_folder_name.lower(), norm_uid.lower()) def updateMessageDatabaseFolder(self, imap_connection, folder_name): '''Run through folder and update message status in database.''' i = imap_connection try: (unused, unused2) = i.select(folder_name, True) except imaplib.IMAP4.error: e = None print >>sys.stderr, 'Cannot select folder', folder_name print >>sys.stderr, str(e) return None self.state.imapLogFile.write('Examine %s\r\n' % (folder_name,)) self.state.imapLogFile.flush() try: (unused, ids) = i.search(None, 'UNDELETED') except imaplib.IMAP4.error: e = None print >>sys.stderr, 'Cannot search for undeleted messages in folder', folder_name print >>sys.stderr, str(e) return None self.state.imapLogFile.write('Search UNDELETED: %s\r\n' % (ids,)) self.state.imapLogFile.flush() for msg_id in ids[0].split(): if self.terminated: return None uid = self._fetchAttribute(i, msg_id, 'UID', self.uid_re) if uid is None: continue flags = self._fetchAttribute(i, msg_id, 'FLAGS', self.flags_re) if flags is None: continue db_key = self._get_messageinfo_database_key(folder_name, uid) msg = self._dummy_msg(db_key) self.state.message_info_database.load_msg(msg) if msg.c: internaldate = [ None] messageText = [ None] else: internaldate = self._fetchAttribute(i, msg_id, 'INTERNALDATE', self.internaldate_re) if internaldate is None: continue time_tuple = imaplib.Internaldate2tuple('INTERNALDATE "' + internaldate + '"') if time_tuple: msg_time = time.mktime(time_tuple) else: msg_time = None try: initial_key = self.state.blocked_messages[self.current_account][self.initial_key] except KeyError: e = None if Options.options[('globals', 'verbose')]: print "Skipping, because don't know", e continue continue if msg_time < initial_key: continue messageText = self._fetchAttribute(i, msg_id, 'BODY.PEEK[]', None, log = False) if messageText is None: continue self.updateMessageDatabaseMessage(uid, flags, internaldate, messageText[0], folder_name) def updateMessageDatabaseMessage(self, uid, flags, internaldate, message_text, folder_name): '''Update message database for this message.''' db_key = self._get_messageinfo_database_key(folder_name, uid) msg = self._dummy_msg(db_key) self.state.message_info_database.load_msg(msg) old = [](_[1]) msg.uid = uid msg.flags = flags msg.folder_name = folder_name msg.internaldate = internaldate for att in msg.stored_attributes: if att not in old or getattr(msg, att) != old[att]: self.state.message_info_database.store_msg(msg) break continue [] if message_text: message_text = message_text[1] else: return None self.processing_queue.put((message_text, db_key, { }, self.current_account)) def expungeMessages(self, connection): '''Delete mail that the user has asked to be deleted.''' leftover = { } messages_to_delete = self.state.delete_messages[self.current_account] to_delete = copy.copy(messages_to_delete.keys()) for msg_id in to_delete: msg = self._dummy_msg(msg_id) self.state.message_info_database.load_msg(msg) if msg.uid == -1: if Options.options[('globals', 'verbose')]: print 'Skipping', msg_id, "(we don't know the UID)" continue continue if Options.options[('globals', 'verbose')]: print 'Deleting message from server (id %s uid %s)' % (msg_id, msg.uid) connection.select(msg.folder_name) try: connection.uid('STORE', msg.uid, '+FLAGS.SILENT', '(\\Deleted \\Seen)') continue except imaplib.IMAP4.error: e = None print >>sys.stderr, 'Error occured while deleting.' print >>sys.stderr, str(e) leftover[msg_id] = { } continue continue self.state.delete_messages[self.current_account] = leftover self.state.delete_messages.store() CHUNK_SIZE = 2048 class ChunkingIMAP4(imaplib.IMAP4): '''Try to avoid MemoryErrors by reading from the socket in smaller chunks, and then combining.''' def read(self, size): """Read 'size' bytes from remote.""" buffer = [] for unused in xrange(size // CHUNK_SIZE): buffer.append(self.file.read(CHUNK_SIZE)) buffer.append(self.file.read(size % CHUNK_SIZE)) buffer = ''.join(buffer) if not len(buffer) == size: raise AssertionError, 'Chunking is wrong' return buffer class ChunkingIMAP4_SSL(imaplib.IMAP4_SSL): '''Try to avoid MemoryErrors by reading from the socket in smaller chunks, and then combining.''' def read(self, size): """Read 'size' bytes from remote.""" chunks = [] read = 0 while read < size: data = self.sslobj.read(size - read % CHUNK_SIZE) read += len(data) chunks.append(data) return ''.join(chunks) class SEBlockingIMAP4Proxy(SEIMAP4Proxy, IMAPFilter): '''Like the parent class, except that instead of marking messages and letting them through, only ham is let through and spam is blocked for later release or deletion. Essentially: o On connection, spin off a thread to filter mailboxes (much as the SpamBayes IMAP filter works), recording the spam status of each message (skipping any that have already been filtered, obviously). Also download a copy of (and delete from the server) any spam messages to the "blocked" database. o Modify FETCH responses to include the classification headers. This behaviour isn\'t necessary for the software to work, so is not currently implemented. o STORE any user-identified (via the GUI) false positives back on the server. o Monitor EXAMINE and SELECT commands to keep track of which mailboxes are used. This way we can ignore any non-mail mailboxes, such as are common with poorly configured IMAP servers. o Proxy any other commands straight through. ''' def __init__(self, clientSocket, serverName, serverPort, state, ssl = False): self.skip_logging = None self.isClosed = False ProxyClassifier.ProxyClassifier.__init__(self) IMAP4ProxyBase.__init__(self, clientSocket, serverName, serverPort, ssl) IMAPFilter.__init__(self) self.handlers = { 'LOGIN': self.onLogin, 'AUTHENTICATE': self.onAuthenticate, 'EXAMINE': self.onSelect, 'SELECT': self.onSelect, 'FETCH': self.onFetch } self.add_when_cooking = False self.threads = [] self.state = state self.ssl = ssl self.state.totalSessions += 1 self.state.activeSessions += 1 self.state.proxies.append(self) def close(self): if not self.isClosed: self.isClosed = True IMAP4ProxyBase.close(self) thread.start_new_thread(self._wait_for_threads, ()) def _wait_for_threads(self): '''Let the state know that we are completely done once all the threads are done.''' for update_thread in self.threads: if update_thread.isAlive(): update_thread.join() continue self.threads = [] self.state.activeSessions -= 1 self.state.proxies.remove(self) def _setupAccount(self, username): self.current_account = '%s_%s_IMAP' % (username, self.serverName) is_new = False if self.current_account not in self.state.blocked_messages: ca = self.current_account self.state.blocked_messages[ca] = { } ik = self.initial_key self.state.blocked_messages[ca][ik] = time.time() + 120 self.state.blocked_messages.store() is_new = True if self.current_account not in self.state.delete_messages: self.state.delete_messages[self.current_account] = { } self.state.delete_messages.store() is_new = True if self.current_account not in self.state.delayed_messages: self.state.delayed_messages[self.current_account] = { } self.state.delayed_messages.store() is_new = True return is_new def onAuthenticate(self, command_id, command, args, response): """Spins off two separate threads that expires any old messages in the caches, and another thread to update the message database. Also creates an account (to hold the blocked messages) if the server has not been seen before, and release any 'delayed' (i.e. false positive) messages. Does not do any processing of the AUTHENTICATE command itself.""" return self.onLogin(command_id, command, args, response) def _auth_callable(self, resp): if resp[:-1].lower() == 'user name': return self.username if resp[:-1].lower() == 'password': return self.password def add_folder_to_filter(self, folder_name): self.filter_folders += (folder_name,) self.save_folders_to_filter() def save_folders_to_filter(self): folders = Options.options[('imap4proxy', 'filter_folders')] for folder in self.filter_folders: folders += (self.current_account + '|' + folder,) folders = tuple(frozenset(folders)) Options.options[('imap4proxy', 'filter_folders')] = folders Options.options.update_file(Options.optionsPathname) def onSelect(self, unused, unused2, args, response): '''Keep track of the folders that are SELECTed and EXAMINEd.''' remainder = 0 folder_name = [] for arg in args: folder_name.append(arg) if arg.count('"') % 2 + remainder == 0: break remainder += arg.count('"') folder_name = ' '.join(folder_name) self.add_folder_to_filter(folder_name) return response def add_welcome(self, imap_connection): m_f_s = email.message_from_string get_welcome = self.get_blocking_welcome_message make_message = self.state.waitingCorpus.makeMessage add_message = self.state.waitingCorpus.addMessage welcomeText = get_welcome(self.username, self.serverName) msg = m_f_s(welcomeText, _class = SEHeaderMessage) msg_id = self.state.getNewMessageName() msg.setId(msg_id) corpus_msg = make_message(msg_id, msg.as_string()) add_message(corpus_msg, observer_flags = storage.NO_TRAINING_FLAG) self.state.message_info_database.load_msg(msg) msg.uid = -1 msg.flags = None msg.folder_name = 'INBOX' msg.internaldate = None self.state.message_info_database.store_msg(msg) self.restoreMessage(imap_connection, msg_id) def onLogin(self, command_id, command, args, response): """Spins off two separate threads that expires any old messages in the caches, and another thread to update the message database. Also creates an account (to hold the blocked messages) if the server has not been seen before, and release any 'delayed' (i.e. false positive) messages. Does not do any processing of the LOGIN command itself.""" SEIMAP4Proxy.onLogin(self, command_id, command, args, response, False) if len(args) < 2: return response if ' ' not in response or response.split(' ')[1].upper() != 'OK': return response if command == 'LOGIN': is_new = self._setupAccount(args[0]) elif command == 'AUTHENTICATE': is_new = self._setupAccount(args[1]) elif not False: raise AssertionError, 'Should never get here' self.load_folders_to_filter() if self.current_account in self.state.open_remote_connections: if Options.options[('globals', 'verbose')]: print 'Connection to', self.current_account, 'already open, so skipping retreive + restore.' return response self.state.open_remote_connections.append(self.current_account) i = self.imap_class(self.serverName, self.serverPort) try: if command == 'LOGIN': self.username = args[0] self.password = args[1] connection = (self.serverName, self.serverPort, self.username, self.password, 'imap4', self.ssl) self.state.model_notifier.add_connection(connection) self.username = self.username.strip('"') self.password = self.password.strip('"') i.login(self.username, self.password) elif command == 'AUTHENTICATE': print >>sys.stderr, "Can't periodic check with AUTHENTICATE" if args[0].lower() != 'login': print >>sys.stderr, "Can't handle login", args return None self.username = base64.decodestring(args[1]) self.password = base64.decodestring(args[2]) i.authenticate(args[0], self._auth_callable) except imaplib.IMAP4.error: e = None print >>sys.stderr, 'Could not login to IMAP server', self.username, self.serverName print >>sys.stderr, str(e) try: self.state.open_remote_connections.remove(self.current_account) except ValueError: print >>sys.stderr, 'Connection was missing:', self.current_account return None msg = 'Logging into %s:%s\r\n' % (self.serverName, self.serverPort) self.state.imapLogFile.write(msg) self.state.imapLogFile.flush() del self.password if is_new: self.add_welcome(i) if self.state.delayed_messages[self.current_account]: restored = [] for msg_id in self.state.delayed_messages[self.current_account]: if Options.options[('globals', 'verbose')]: print >>sys.stderr, 'Restoring', msg_id if self.restoreMessage(i, msg_id): restored.append(msg_id) continue old = self.state.delayed_messages[self.current_account] for msg_id in restored: del old[msg_id] self.state.delayed_messages[self.current_account] = old self.expungeMessages(i) update_thread = threading.Thread(target = self.updateMessageDatabase, args = (i,)) update_thread.setDaemon(True) self.threads.append(update_thread) update_thread.start() return response uidnext_re = re.compile('uidnext +(\\d+)', re.IGNORECASE) def restoreMessage(self, imap_connection, msg_id): '''Put a message back on the server, as it was a false positive.''' for cache in (self.state.hamCorpus, self.state.unsureCorpus, self.state.waitingCorpus): msg = cache.get(msg_id) if msg is not None: break continue if msg is None: print >>sys.stderr, "Can't find message in cache to restore to server!" return False msg.load() msg.setId(msg_id) self.state.message_info_database.load_msg(msg) if msg.internaldate is None or msg.internaldate == [ None]: date = time.localtime() else: date = 'INTERNALDATE "' + msg.internaldate + '"' date = imaplib.Internaldate2tuple(date) if not msg.flags: pass if not re.sub('\\\\Recent ?', '', '', re.IGNORECASE): pass flags = None try: imap_connection.select(msg.folder_name) (unused, response) = imap_connection.response('OK') except imaplib.IMAP4.error: e = None print >>sys.stderr, 'Failed to get uidnext.', msg_id print >>sys.stderr, str(e) for item in response: mo = self.uidnext_re.search(item) if mo: next_uid = mo.group(1) break continue else: next_uid = None for flgs, dte in ((flags, date), (None, date), (flags, imaplib.Time2Internaldate(time.time())), (None, imaplib.Time2Internaldate(time.time()))): try: imap_connection.append(msg.folder_name, flgs, dte, msg.as_string()) continue except imaplib.IMAP4.error: e = None print >>sys.stderr, 'Failed putting message back on server', msg.folder_name, flgs, dte, msg_id print >>sys.stderr, str(e) return False continue found_uid = None for i in xrange(20): if found_uid: break if next_uid: messageText = self._fetchAttribute(imap_connection, next_uid, 'BODY.PEEK[]', None, log = False, uid = True) if messageText[0][1] == msg.as_string(): found_uid = next_uid if not found_uid: id_header = msg['Message-ID'] try: uid_func = imap_connection.uid arg = '(UNDELETED HEADER Message-ID %s)' % (id_header,) (unused, response) = uid_func('SEARCH', arg) except imaplib.IMAP4.error: e = None print >>sys.stderr, 'Failed to find message.', msg_id print >>sys.stderr, str(e) matches = response[0].split() if len(matches) == 1: found_uid = matches[0] else: print >>sys.stderr, 'Found wrong number of messages', response imap_connection.noop() if not found_uid: print >>sys.stderr, 'Failed putting message back on server', msg.folder_name, flags, date, msg_id print >>sys.stderr, str(e) return False db_key = self._get_messageinfo_database_key(msg.folder_name, found_uid) msg = self._dummy_msg(db_key) self.state.message_info_database.load_msg(msg) msg.c = PERSISTENT_HAM_STRING self.state.message_info_database.store_msg(msg) self.state.imapLogFile.write('Appending %s %s %s\r\n' % (msg.folder_name, flags, date)) self.state.imapLogFile.flush() return True